<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
</head>
<body>

                    <h4>多进程</h4>
                    <div class="x-wiki-info"><span>1445次阅读</span></div>
                    <hr style="border-top-color:#ccc" />
                    <div class="x-wiki-content x-content"><p>要让Python程序实现多进程（multiprocessing），我们先了解操作系统的相关知识。</p>
<p>Unix/Linux操作系统提供了一个<code>fork()</code>系统调用，它非常特殊。普通的函数调用，调用一次，返回一次，但是<code>fork()</code>调用一次，返回两次，因为操作系统自动把当前进程（称为父进程）复制了一份（称为子进程），然后，分别在父进程和子进程内返回。</p>
<p>子进程永远返回<code>0</code>，而父进程返回子进程的ID。这样做的理由是，一个父进程可以fork出很多子进程，所以，父进程要记下每个子进程的ID，而子进程只需要调用<code>getppid()</code>就可以拿到父进程的ID。</p>
<p>Python的<code>os</code>模块封装了常见的系统调用，其中就包括<code>fork</code>，可以在Python程序中轻松创建子进程：</p>
<pre><code># multiprocessing.py
import os

print &#39;Process (%s) start...&#39; % os.getpid()
pid = os.fork()
if pid==0:
    print &#39;I am child process (%s) and my parent is %s.&#39; % (os.getpid(), os.getppid())
else:
    print &#39;I (%s) just created a child process (%s).&#39; % (os.getpid(), pid)
</code></pre><p>运行结果如下：</p>
<pre><code>Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
</code></pre><p>由于Windows没有<code>fork</code>调用，上面的代码在Windows上无法运行。由于Mac系统是基于BSD（Unix的一种）内核，所以，在Mac下运行是没有问题的，推荐大家用Mac学Python！</p>
<p>有了<code>fork</code>调用，一个进程在接到新任务时就可以复制出一个子进程来处理新任务，常见的Apache服务器就是由父进程监听端口，每当有新的http请求时，就fork出子进程来处理新的http请求。</p>
<h3 id="multiprocessing">multiprocessing</h3>
<p>如果你打算编写多进程的服务程序，Unix/Linux无疑是正确的选择。由于Windows没有<code>fork</code>调用，难道在Windows上无法用Python编写多进程的程序？</p>
<p>由于Python是跨平台的，自然也应该提供一个跨平台的多进程支持。<code>multiprocessing</code>模块就是跨平台版本的多进程模块。</p>
<p><code>multiprocessing</code>模块提供了一个<code>Process</code>类来代表一个进程对象，下面的例子演示了启动一个子进程并等待其结束：</p>
<pre><code>from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print &#39;Run child process %s (%s)...&#39; % (name, os.getpid())

if __name__==&#39;__main__&#39;:
    print &#39;Parent process %s.&#39; % os.getpid()
    p = Process(target=run_proc, args=(&#39;test&#39;,))
    print &#39;Process will start.&#39;
    p.start()
    p.join()
    print &#39;Process end.&#39;
</code></pre><p>执行结果如下：</p>
<pre><code>Parent process 928.
Process will start.
Run child process test (929)...
Process end.
</code></pre><p>创建子进程时，只需要传入一个执行函数和函数的参数，创建一个<code>Process</code>实例，用<code>start()</code>方法启动，这样创建进程比<code>fork()</code>还要简单。</p>
<p><code>join()</code>方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步。</p>
<h3 id="pool">Pool</h3>
<p>如果要启动大量的子进程，可以用进程池的方式批量创建子进程：</p>
<pre><code>from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print &#39;Run task %s (%s)...&#39; % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print &#39;Task %s runs %0.2f seconds.&#39; % (name, (end - start))

if __name__==&#39;__main__&#39;:
    print &#39;Parent process %s.&#39; % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print &#39;Waiting for all subprocesses done...&#39;
    p.close()
    p.join()
    print &#39;All subprocesses done.&#39;
</code></pre><p>执行结果如下：</p>
<pre><code>Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
</code></pre><p>代码解读：</p>
<p>对<code>Pool</code>对象调用<code>join()</code>方法会等待所有子进程执行完毕，调用<code>join()</code>之前必须先调用<code>close()</code>，调用<code>close()</code>之后就不能继续添加新的<code>Process</code>了。</p>
<p>请注意输出的结果，task <code>0</code>，<code>1</code>，<code>2</code>，<code>3</code>是立刻执行的，而task <code>4</code>要等待前面某个task完成后才执行，这是因为<code>Pool</code>的默认大小在我的电脑上是4，因此，最多同时执行4个进程。这是<code>Pool</code>有意设计的限制，并不是操作系统的限制。如果改成：</p>
<pre><code>p = Pool(5)
</code></pre><p>就可以同时跑5个进程。</p>
<p>由于<code>Pool</code>的默认大小是CPU的核数，如果你不幸拥有8核CPU，你要提交至少9个子进程才能看到上面的等待效果。</p>
<h3 id="-">进程间通信</h3>
<p><code>Process</code>之间肯定是需要通信的，操作系统提供了很多机制来实现进程间的通信。Python的<code>multiprocessing</code>模块包装了底层的机制，提供了<code>Queue</code>、<code>Pipes</code>等多种方式来交换数据。</p>
<p>我们以<code>Queue</code>为例，在父进程中创建两个子进程，一个往<code>Queue</code>里写数据，一个从<code>Queue</code>里读数据：</p>
<pre><code>from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in [&#39;A&#39;, &#39;B&#39;, &#39;C&#39;]:
        print &#39;Put %s to queue...&#39; % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print &#39;Get %s from queue.&#39; % value

if __name__==&#39;__main__&#39;:
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()
</code></pre><p>运行结果如下：</p>
<pre><code>Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
</code></pre><p>在Unix/Linux下，<code>multiprocessing</code>模块封装了<code>fork()</code>调用，使我们不需要关注<code>fork()</code>的细节。由于Windows没有<code>fork</code>调用，因此，<code>multiprocessing</code>需要“模拟”出<code>fork</code>的效果，父进程所有Python对象都必须通过pickle序列化再传到子进程去，所有，如果<code>multiprocessing</code>在Windows下调用失败了，要先考虑是不是pickle失败了。</p>
<h3 id="-">小结</h3>
<p>在Unix/Linux下，可以使用<code>fork()</code>调用实现多进程。</p>
<p>要实现跨平台的多进程，可以使用<code>multiprocessing</code>模块。</p>
<p>进程间通信是通过<code>Queue</code>、<code>Pipes</code>等实现的。</p>
</div>

                    <hr style="border-top-color:#ccc" />

                    